/*
 * Copyright 2024 Redpanda Data, Inc.
 *
 * Licensed as a Redpanda Enterprise file under the Redpanda Community
 * License (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
 */
#pragma once

#include "base/outcome.h"
#include "base/vlog.h"
#include "bytes/iobuf.h"
#include "cloud_io/provider.h"
#include "cloud_io/remote.h"
#include "cloud_storage_clients/types.h"
#include "iceberg/logger.h"
#include "iceberg/uri.h"
#include "ssx/future-util.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/future.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/noncopyable_function.hh>

namespace iceberg {

using namespace std::chrono_literals;

class metadata_io {
public:
    enum class errc {
        // The call has timed out. Callers should retry, if allowed.
        timedout = 1,

        // The call failed for some reason that doesn't appear to be retriable.
        failed,

        // The call failed because the system is shutting down.
        shutting_down,

        // Malformed URI
        invalid_uri,
    };
    metadata_io(cloud_io::remote& io, cloud_storage_clients::bucket_name b)
      : io_(io)
      , bucket_(std::move(b))
      , uri_converter_(io.provider()) {}
    ~metadata_io() = default;

public:
    /// Iceberg ecosystem URI formatting path/to/file =>
    /// s3://bucket/path/to/file which are written in Iceberg catalog/manifests
    /// files and then are used by 3rd party tools to read data.
    uri to_uri(const std::filesystem::path& path) const {
        return uri_converter_.to_uri(bucket_, path);
    }

    /// Parse URIs generated by to_uri back into paths that redpanda io
    /// abstraction works with.
    /// E.g. s3://bucket/path/to/file => path/to/file
    /// Leaves the path as is if it doesn't match the expected URI base.
    checked<std::filesystem::path, metadata_io::errc>
    from_uri(const uri& uri) const {
        auto v = uri_converter_.from_uri(bucket_, uri);
        if (!v) {
            vlog(log.warn, "Failed to construct path from URI: {}", uri);
            return errc::invalid_uri;
        }
        return std::filesystem::path(v.value());
    }

    // Deletes the given files in object storage.
    ss::future<checked<std::nullopt_t, metadata_io::errc>> delete_files(
      chunked_vector<std::filesystem::path> files_to_delete,
      retry_chain_node& retry_parent) {
        chunked_vector<cloud_storage_clients::object_key> keys;
        keys.reserve(files_to_delete.size());
        std::transform(
          files_to_delete.begin(),
          files_to_delete.end(),
          std::back_inserter(keys),
          [](std::filesystem::path path) {
              return cloud_storage_clients::object_key{std::move(path)};
          });
        auto delete_fut = co_await ss::coroutine::as_future(io_.delete_objects(
          bucket_, std::move(keys), retry_parent, [](size_t) {}));
        if (delete_fut.failed()) {
            auto eptr = delete_fut.get_exception();
            auto is_shutdown = ssx::is_shutdown_exception(eptr);
            auto log_lvl = is_shutdown ? ss::log_level::debug
                                       : ss::log_level::warn;
            vlogl(
              log, log_lvl, "Exception thrown while deleting files: {}", eptr);
            co_return is_shutdown ? metadata_io::errc::shutting_down
                                  : metadata_io::errc::failed;
        }
        switch (delete_fut.get()) {
            using enum cloud_io::upload_result;
        case success:
            co_return std::nullopt;
        case cancelled:
            co_return errc::shutting_down;
        case timedout:
            co_return errc::timedout;
        case failed:
            co_return errc::failed;
        }
    }

protected:
    template<typename T>
    ss::future<checked<T, errc>> download_object(
      const std::filesystem::path& path,
      std::string_view display_str,
      ss::noncopyable_function<T(iobuf)> parse) {
        retry_chain_node retry(
          io_.as(),
          ss::lowres_clock::duration{30s},
          10ms,
          retry_strategy::backoff);
        iobuf buf;
        auto res = co_await ss::coroutine::as_future(io_.download_object({
          .transfer_details = {
            .bucket = bucket_,
            .key = cloud_storage_clients::object_key{path},
            .parent_rtc = retry,
          },
          .display_str = display_str,
          .payload = buf,
        }));
        if (res.failed()) {
            const auto ex = res.get_exception();
            const auto msg = fmt::format(
              "Exception while performing {} upload: {}", display_str, ex);
            if (ssx::is_shutdown_exception(ex)) {
                vlog(log.debug, "{}", msg);
                co_return errc::shutting_down;
            }
            vlog(log.error, "{}", msg);
            co_return errc::failed;
        }
        switch (res.get()) {
            using enum cloud_io::download_result;
        case success:
            // Fallthrough to handle the bytes below.
            break;
        case timedout:
            co_return errc::timedout;
        case notfound:
            // Not found is not a retriable error.
            [[fallthrough]];
        case failed:
            co_return errc::failed;
        }
        try {
            co_return parse(std::move(buf));
        } catch (...) {
            auto ex = std::current_exception();
            vlog(
              log.error,
              "Exception while parsing {} (path: {}): {}",
              display_str,
              path,
              ex);
            co_return errc::failed;
        }
    }

    template<typename T>
    ss::future<checked<size_t, errc>> upload_object(
      const std::filesystem::path& path,
      const T& t,
      std::string_view display_str,
      ss::noncopyable_function<iobuf(const T&)> serialize) {
        auto buf = serialize(t);
        auto uploaded_size_bytes = buf.size_bytes();
        retry_chain_node retry(
          io_.as(),
          ss::lowres_clock::duration{30s},
          10ms,
          retry_strategy::backoff);
        auto res = co_await ss::coroutine::as_future(io_.upload_object({
          .transfer_details = {
            .bucket = bucket_,
            .key = cloud_storage_clients::object_key{path},
            .parent_rtc = retry,
          },
          .display_str = display_str,
          .payload = std::move(buf),
        }));
        if (res.failed()) {
            const auto ex = res.get_exception();
            const auto msg = fmt::format(
              "Exception while performing {} upload: {}", display_str, ex);
            if (ssx::is_shutdown_exception(ex)) {
                vlog(log.debug, "{}", msg);
                co_return errc::shutting_down;
            }
            vlog(log.error, "{}", msg);
            co_return errc::failed;
        }
        switch (res.get()) {
            using enum cloud_io::upload_result;
        case success:
            co_return uploaded_size_bytes;
        case cancelled:
            co_return errc::shutting_down;
        case failed:
            co_return errc::failed;
        case timedout:
            co_return errc::timedout;
        }
    }

    ss::future<checked<bool, metadata_io::errc>> object_exists(
      const std::filesystem::path& path, std::string_view display_str) {
        retry_chain_node retry(
          io_.as(),
          ss::lowres_clock::duration{30s},
          10ms,
          retry_strategy::backoff);
        auto res = co_await ss::coroutine::as_future(io_.object_exists(
          bucket_,
          cloud_storage_clients::object_key{path},
          retry,
          display_str));
        if (res.failed()) {
            const auto ex = res.get_exception();
            const auto msg = fmt::format(
              "Exception while performing {} existence check: {}",
              display_str,
              ex);
            if (ssx::is_shutdown_exception(ex)) {
                vlog(log.debug, "{}", msg);
                co_return errc::shutting_down;
            }
            vlog(log.error, "{}", msg);
            co_return errc::failed;
        }
        switch (res.get()) {
            using enum cloud_io::download_result;
        case success:
            co_return true;
        case timedout:
            co_return errc::timedout;
        case notfound:
            co_return false;
        case failed:
            co_return errc::failed;
        }
    }

protected:
    cloud_io::remote& io_;
    const cloud_storage_clients::bucket_name bucket_;

private:
    uri_converter uri_converter_;
};

} // namespace iceberg
